---
title: How to upgrade to Prefect 3.0
sidebarTitle: Upgrade to Prefect 3.0
description: Learn how to upgrade from Prefect 2.x to Prefect 3.0.
---

Prefect 3.0 introduces a number of enhancements to the OSS product: a new events & automations backend for event-driven workflows and observability, improved runtime performance, autonomous task execution and a streamlined caching layer based on transactional semantics.

The majority of these enhancements maintain compatibility with most Prefect 2.0 workflows, but there are a few caveats that you may need to adjust for.

To learn more about the enhanced performance and new features, see [What's new in Prefect 3.0](/v3/get-started/whats-new-prefect-3).

For the majority of users, upgrading to Prefect 3.0 will be a seamless process that requires few or no code changes.
This guide highlights key changes that you may need to consider when upgrading.

<Info>
**Prefect 2.0** refers to the 2.x lineage of the open source prefect package, and **Prefect 3.0** refers exclusively to the 3.x lineage of the prefect package. Neither version is strictly tied to any aspect of Prefect's commercial product, [Prefect Cloud](/v3/how-to-guides/cloud/connect-to-cloud).
</Info>

## Quickstart

To upgrade to Prefect 3.0, run:

```bash
pip install -U prefect
```

If you self-host a Prefect server, run this command to update your database:

```bash
prefect server database upgrade
```

If you use a Prefect integration or extra, remember to upgrade it as well. For example:

```bash
pip install -U 'prefect[aws]'
```

## Upgrade notes

### Pydantic V2

Prefect 3.0 is built with Pydantic 2.0 for improved performance. All Prefect objects will automatically upgrade, but if you use custom Pydantic models for flow parameters or custom blocks, you'll need to ensure they are compatible with Pydantic 2.0. You can continue to use Pydantic 1.0 models in your own code if they do not interact directly with Prefect.

Refer to [Pydantic's migration guide](https://docs.pydantic.dev/latest/migration/) for detailed information on necessary changes.

<Warning>
We recommend pausing all deployment schedules prior to upgrading.
Because of differences in Pydantic datetime handling that affect the scheduler's idempotency logic, there is a small risk of the scheduler duplicating runs in its first loop.
</Warning>

### Module location and name changes

Some less-commonly used modules have been renamed, reorganized, or removed for clarity. The old import paths will continue to be supported for 6 months, but emit deprecation warnings. You can look at the [deprecation code](https://github.com/PrefectHQ/prefect/blob/main/src/prefect/_internal/compatibility/migration.py) to see a full list of affected paths.

### Async tasks in synchronous flows

<info>
This change affects you if: you use advanced asynchronous behaviors in your flows.
</info>

In Prefect 2.0, it was possible to call native `async` tasks from synchronous flows, a pattern that is not normally supported in Python. Prefect 3.0.0 removes this behavior to reduce complexity and potential issues and edge cases. If you relied on asynchronous tasks in synchronous flows, you must either make your flow asynchronous or use a task runner that supports asynchronous execution.

### Flow final states

<info>
This change affects you if: you want your flow to fail if any task in the flow fails, and you invoke your tasks in a way that doesn't automatically raise an error (including submitting them to a `TaskRunners`).
</info>

In Prefect 2.0, the final state of a flow run was influenced by the states of its task runs; if any task run failed, the flow run was marked as failed.

In Prefect 3.0, the final state of a flow run is entirely determined by:

1. The `return` value of the flow function (same as in Prefect 2.0):
   - Literal values are considered successful.
   - Any explicit `State` that is returned will be considered the final state of the flow run. If an iterable of `State` objects is returned, all must be `Completed` for the flow run to be considered `Completed`. If any are `Failed`, the flow run will be marked as `Failed`.

2. Whether the flow function allows an exception to `raise`:
   - Exceptions that are allowed to propagate will result in a `Failed` state.
   - Exceptions suppressed with `raise_on_failure=False` will not affect the flow run state.

This change means that task failures within a flow do not automatically cause the flow run to fail unless they affect the flow's return value or raise an uncaught exception.

<Warning>
When migrating from Prefect 2.0 to Prefect 3, be aware that flows may now complete successfully even if they contain failed tasks, unless you explicitly handle task failures.
</Warning>

To ensure your flow fails when critical tasks fail, consider these approaches:

1. Allow task exceptions to propagate by not using `raise_on_failure=False`.
2. Use `return_state=True` and explicitly check task states to conditionally `raise` the underlying exception or return a failed state.
3. Use try/except blocks to handle task failures and return appropriate states.

#### Examples

<CodeGroup>
```python Allow Unhandled Exceptions
from prefect import flow, task

@task
def failing_task():
    raise ValueError("Task failed")

@flow
def my_flow():
    failing_task()  # Exception propagates, causing flow failure

try:
    my_flow()
except ValueError as e:
    print(f"Flow failed: {e}")  # Output: Flow failed: Task failed
```

```python Use return_state
from prefect import flow, task
from prefect.states import Failed

@task
def failing_task():
    raise ValueError("Task failed")

@flow
def my_flow():
    state = failing_task(return_state=True)
    if state.is_failed():
        raise ValueError(state.result())
    return "Flow completed successfully"

try:
    print(my_flow())
except ValueError as e:
    print(f"Flow failed: {e}")  # Output: Flow failed: Task failed
```

```python Use try/except
from prefect import flow, task
from prefect.states import Failed

@task
def failing_task():
    raise ValueError("Task failed")

@flow
def my_flow():
    try:
        failing_task()
    except ValueError:
        return Failed(message="Flow failed due to task failure")
    return "Flow completed successfully"

print(my_flow())  # Output: Failed(message='Flow failed due to task failure')
```
</CodeGroup>

Choose the strategy that best fits your specific use case and error handling requirements.

-----

### Futures interface

<info>
This change affects you if: you directly interact with `PrefectFuture` objects.
</info>

PrefectFutures now have a standard synchronous interface, with an asynchronous one [planned soon](https://github.com/PrefectHQ/prefect/issues/15008).

### Automatic task caching

<info>
This change affects you if: you rely on side effects in your tasks
</info>

Prefect 3.0 introduces a powerful idempotency engine. By default, tasks in a flow run are automatically cached if they are called more than once with the same inputs. If you rely on tasks with side effects, this may result in surprising behavior. To disable caching, pass `cache_policy=None` to your task.

### Unmapped mutable objects in mapped tasks

<info>
This change affects you if: you use `unmapped()` to pass mutable objects (like lists or dicts) to mapped tasks.
</info>

In Prefect 2.0, when passing a mutable object (such as a list or dict) using `unmapped()` to a mapped task, each task instance appeared to receive an independent copy of the object. In Prefect 3.0, the unmapped object is shared by reference across all mapped task runs. If any task mutates the object, all other concurrently running tasks will see those mutations, potentially causing race conditions and unexpected behavior.

<Warning>
When migrating from Prefect 2.0 to Prefect 3, be aware that mutable objects passed via `unmapped()` are now shared across all mapped tasks. Mutating these objects can lead to race conditions.
</Warning>

To avoid this issue, create a copy of the mutable object within each task before modifying it:

<CodeGroup>
```python Problematic (Prefect 3.0)
from prefect import flow, task, unmapped

@task
def process_item(item: str, shared_state: dict):
    # This mutation affects ALL tasks!
    shared_state["current_item"] = item
    # Race condition: another task may overwrite this before we read it
    print(f"Processing {shared_state['current_item']}")

@flow
def my_flow():
    shared_state = {"current_item": None}

    # All tasks share the same dict reference
    process_item.map(
        item=["A", "B", "C"],
        shared_state=unmapped(shared_state)
    )
```

```python Recommended (Prefect 3.0)
from prefect import flow, task, unmapped
from copy import deepcopy

@task
def process_item(item: str, shared_state: dict):
    # Create a copy for this task
    local_state = deepcopy(shared_state)
    local_state["current_item"] = item
    print(f"Processing {local_state['current_item']}")

@flow
def my_flow():
    shared_state = {"current_item": None}

    process_item.map(
        item=["A", "B", "C"],
        shared_state=unmapped(shared_state)
    )
```

```python Alternative: Pass immutable values
from prefect import flow, task, unmapped

@task
def process_item(item: str, config_value: str):
    # Immutable strings are safe to share
    print(f"Processing {item} with config: {config_value}")

@flow
def my_flow():
    config = "production"

    process_item.map(
        item=["A", "B", "C"],
        config_value=unmapped(config)
    )
```
</CodeGroup>

If you need to share read-only configuration or settings across tasks, consider using immutable types (strings, tuples, frozen dataclasses) instead of mutable objects.

### Workers

<info>
This change affects you if: you're using agents from an early version of Prefect 2.0.
</info>

In Prefect 2.0, agents were deprecated in favor of next-generation workers. Workers are now standard in Prefect 3. For detailed information on upgrading from agents to workers, please refer to our [upgrade guide](https://docs-3.prefect.io/v3/resources/upgrade-agents-to-workers).

### Resolving common gotchas

#### `AttributeError: 'coroutine' object has no attribute <some attribute>`

When within an asynchronous task or flow context, if you do **not** `await` an asynchronous function or method, this error will be raised when you try to use the object.

To fix it, `await` the asynchronous function or method or use `_sync=True`.

For example, `Block`'s `load` method is asynchronous in an async context:
<CodeGroup>
```python Incorrect
from prefect.blocks.system import Secret

async def my_async_function():
    my_secret = Secret.load("my-secret")
    print(my_secret.get()) # AttributeError: 'coroutine' object has no attribute 'get'
```

```python Correct
from prefect.blocks.system import Secret

async def my_async_function():
    my_secret = await Secret.load("my-secret")
    print(my_secret.get()) # This will work
```

```python Also Correct
from prefect.blocks.system import Secret

async def my_async_function():
    my_secret = Secret.load("my-secret", _sync=True)
    print(my_secret.get()) # This will work
```

</CodeGroup>

Similarly, if you never use an un-awaited coroutine, you may see a warning like this:

```python
RuntimeWarning: coroutine 'some_async_callable' was never awaited
...
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
```

This is the same problem as above, and you may `await` the coroutine to fix it or use `_sync=True`.

#### `TypeError: object <some Type> can't be used in 'await' expression`

This error occurs when using the `await` keyword before an object that is not a coroutine.

To fix it, remove the `await`.

For example, `my_task.submit(...)` is _always_ synchronous in Prefect 3.x:

<CodeGroup>
```python Incorrect
from prefect import flow, task

@task
async def my_task():
    pass

@flow
async def my_flow():
    future = await my_task.submit() # TypeError: object PrefectConcurrentFuture can't be used in 'await' expression
```

```python Correct
from prefect import flow, task

@task
async def my_task():
    pass

@flow
async def my_flow():
    future = my_task.submit() # This will work
```
</CodeGroup>

See the [Futures interface section](#futures-interface) for more information on this particular gotcha.

#### `TypeError: Flow.deploy() got an unexpected keyword argument 'schedule'`

In Prefect 3.0, the `schedule` argument has been removed in favor of the `schedules` argument.

This applies to both the `Flow.serve` and `Flow.deploy` methods.

<CodeGroup>
```python Prefect 2.0 {11}
from datetime import timedelta
from prefect import flow
from prefect.client.schemas.schedules import IntervalSchedule

@flow
def my_flow():
    pass

my_flow.serve(
    name="my-flow",
    schedule=IntervalSchedule(interval=timedelta(minutes=1))
)
```

```python Prefect 3.0 {11}
from datetime import timedelta
from prefect import flow
from prefect.schedules import Interval

@flow
def my_flow():
    pass

my_flow.serve(
    name="my-flow",
    schedules=[Interval(timedelta(minutes=1))]
)
```
</CodeGroup>
